gRPC 学习之-高级知识
gRPC 学习通信基础
高级知识
之前的文章中记录了对 gRPC 的使用,但是都没有涉及底层的通信基础知识。这篇文章主要介绍 gRPC 的高级知识,涉及到 gRPC 的底层原理、
gRPC 底层原理
gRPC 超越基础知识
这部分主要讲述在构建真正的gRPC应用时,对于需要增强它们的各种能力,比如:拦截gRPC的输入和输出、弹性处理网络延迟、处理错误、在服务和消费者之间共享元数据。
拦截器
在 gRPC
中,可以拦截 gRPC
的执行.来满足特定的需求,如日志、认证、性能度盐指标等,这会使用一种名为拦截器的扩展机制。它是 gRPC
核心扩展机制之一 ,在一些使用场呆中非常有用,比如日志、身份验证、授权、性能度盘指标、跟踪以及其他一些自定义错求.
根据所拦截的 RPC
调用类型,gRPC
拦截可以分为两类,对于一元RPC
,使用一元拦截器,对于流RPC
,使用流拦截器;这些拦截器既可以用在服务端也可以用在客户端。
服务端拦截器
当客户端调用 gRPC服务的远程方法肘,通过使用服务器端拦截器,可以在执行远程方法 之前,执行一个通用的逻辑。
在服务器销, 一元拦截器拦截一元 RPC,梳拦iliV:ìI售出1拦截流 RPC. 下回来右一下服务器揣 一元拦敲器.
- 一元拦截
要实现这一点 ,需要先实现 UnaryServerlnterceptor
类型的函数, 并在创 gRPC
服务器端时将函数注册进来。 UnaryServerlnterceptor
是用于服务器端一元拦截的类型。函数定义类型如下。
func(ctx context.Context, req interface{}, info 叫JnaryServerlnfo,
hand1er UnaryHand1er) (re5p interface{}, err error)
例如:进行一元拦截,打印日志。
func orderUnaryServerlnterceptor(ctx context.Context, req interface{},
info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log.Println("======= [Server Interceptor1 ", info.FullMethod)
m, err := handler(ctx, req)
log.Printf("post proc mess: %s", m)
// 可以拦截修改返回内容
if order, ok := m.(*pb.Order); ok {
order.Price = 100
return order, err
}
return m, err
}
func main() {
....省略
s := grpc.NewServer(grpc.UnaryInterceptor(orderUnaryServerlnterceptor))
pb.RegisterOrderManagementServer(s, &server{})
if err := s.Serve(listen); err != nil {
panic(err)
}
}
- 流拦截
服务器端流拦截器会拦截 gRPC
服务器所处理的所有流RPC
。 流拦截器包括前置处理阶段和流操作拦截阶段。
要实现流拦截,需要实现Streal'1Serverlnterceptor
函数:
func (srv interface{}, ss grpc.ServerStream,info *grpc.StreamServerInfo,handler grpc.StreamHandler) error {
}
grpc.ServerStream
的包装器可以拦截 gRPC
服务发送或接收到的数据,它实现了 SendMsg
函数和 RecvMsg
函数,这两个函数分别会在服务发送和接收 RPC
流消息的时候调用。
在流 RPC
进入服务前,可以通过 grpc.ServeStream
进行流的处理,再通过 grpc.StreamHandler
来进行调用。
例如:在 OrderManager
的流接口中先进行输入流的处理,再调用 RPC
.
type wrappedSteam struct {
grpc.ServerStream
}
var _ grpc.ServerStream = &wrappedSteam{}
func (w *wrappedSteam) RecvMsg(m interface{}) error {
log.Printf("=====RecvMSg==== %T", m)
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedSteam) SendMsg(m interface{}) error {
log.Println("====SendMsg====", m)
return w.ServerStream.SendMsg(m)
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedSteam{s}
}
func orderServerStreamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
log.Println("===== [Server Steam Interceptor ]", info.FullMethod)
err := handler(srv, newWrappedStream(ss))
if err != nil {
log.Fatalf("RPC failed with error %v", err)
}
return err
}
func main() {
...
s := grpc.NewServer(grpc.StreamInterceptor(orderServerStreamInterceptor))
pb.RegisterOrderManagementServer(s, &server{})
if err := s.Serve(listen); err != nil {
panic(err)
}
}
在流开始 时会进入 orderServerStreamInterceptor
方法,流结束后才会推出,在流每次 RecvMsg
时会进入到流拦截的 (w *wrappedSteam) RecvMsg
方法中,SendMsg
同理。
客户端拦截器
- 一元拦截
一元拦截的函数签名:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
与服务端拦截一元拦截器一样,客户端一元拦截器也有不同的阶段。
例子:
func orderClientInterceptor(ctx context.Context, method string, req, apply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println("Method :" + method)
start := time.Now()
err := invoker(ctx, method, req, apply, cc, opts...)
log.Printf("cost is [%d]ms",time.Now().Sub(start).Milliseconds())
return err
}
func main(){
conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithUnaryInterceptor(orderClientInterceptor))
}
- 流拦截
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
例子:
type wrappedSteam struct {
grpc.ClientStream
}
func (w *wrappedSteam) RecvMsg(m interface{}) error {
log.Printf("===== [ Client Stream Interceptor ] Receive a message (Type :%T) at %v", m, time.Now().Format(time.RFC3339))
return w.ClientStream.RecvMsg(m)
}
func (w *wrappedSteam) SendMsg(m interface{}) error {
log.Printf("===== [Client Stream Interceptor ] Send a message (Type :%T) at %v",m, time.Now().Format(time.RFC3339))
return w.ClientStream.SendMsg(m)
}
func newWrappedStream(cs grpc.ClientStream) grpc.ClientStream {
return &wrappedSteam{cs}
}
func orderClientStreamInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println("===== [Client Interceptor] ",method)
s, err := streamer(ctx, desc, cc, method, opts...)
return newWrappedStream(s), err
}
func main(){
conn, err := grpc.Dial(address, grpc.WithInsecure(),grpc.WithStreamInterceptor(orderClientStreamInterceptor))
}